package flinkstudy.table;

import flinkstudy.bo.EstateInfo;
import flinkstudy.table.environment.FlinkBatchQuery;
import flinkstudy.table.mock.EstateInfoCsvDataSource;
import org.apache.flink.api.java.DataSet;
import org.apache.flink.api.java.ExecutionEnvironment;
import org.apache.flink.api.java.operators.DataSource;
import org.apache.flink.table.api.Table;
import org.apache.flink.table.api.java.BatchTableEnvironment;
import org.junit.Test;

/**
 * 离线批处理查询
 *
 * @author daocr
 * @date 2019/12/17
 */

public class BatchQuery {


    /**
     * flink 批处理sql查询
     */
    @Test
    public void flinkBatchSqlQuery() throws Exception {

        FlinkBatchQuery flinkBatchQuery = new FlinkBatchQuery();
        ExecutionEnvironment flinkBatchEnv = flinkBatchQuery.getFlinkBatchEnv();
        BatchTableEnvironment flinkBatchTableEnv = flinkBatchQuery.getFlinkBatchTableEnv();

        //1、 获取数据源
        DataSource<EstateInfo> estateInfoDataSource = EstateInfoCsvDataSource.of(flinkBatchEnv);
        //2、把数据源转换为  table
        Table estateInfoTable = flinkBatchTableEnv.fromDataSet(estateInfoDataSource);
        // 3、把表注册到 flink
        flinkBatchTableEnv.registerTable(EstateInfoCsvDataSource.T_ESTATE_INFO, estateInfoTable);
        //4 、执行sql 查询，得到查询结果
        Table table = flinkBatchTableEnv.sqlQuery("select * from " + EstateInfoCsvDataSource.T_ESTATE_INFO + " order by price desc  limit 10");
        //5、把查询结果 转换  为 dataset
        DataSet<EstateInfo> estateInfoDataSet = flinkBatchTableEnv.toDataSet(table, EstateInfo.class);
        estateInfoDataSet.print();

    }

    /**
     * flink 批处理 dsl 查询
     */
    @Test
    public void flinkBatchDslQuery() throws Exception {

        FlinkBatchQuery flinkBatchQuery = new FlinkBatchQuery();
        ExecutionEnvironment flinkBatchEnv = flinkBatchQuery.getFlinkBatchEnv();
        BatchTableEnvironment flinkBatchTableEnv = flinkBatchQuery.getFlinkBatchTableEnv();

        //1、 获取数据源
        DataSource<EstateInfo> estateInfoDataSource = EstateInfoCsvDataSource.of(flinkBatchEnv);
        //2、把数据源转换为  table
        Table estateInfoTable = flinkBatchTableEnv.fromDataSet(estateInfoDataSource);
        // 3、把表注册到 flink
        flinkBatchTableEnv.registerTable(EstateInfoCsvDataSource.T_ESTATE_INFO, estateInfoTable);
        //4、查询并且排序 最后获取前10条
        Table table = flinkBatchTableEnv.scan(EstateInfoCsvDataSource.T_ESTATE_INFO).select(EstateInfoCsvDataSource.ESTATE_INFO_COLUMN).orderBy("price.desc").fetch(10);
        //5、把查询结果 转换  为 dataset
        DataSet<EstateInfo> estateInfoDataSet = flinkBatchTableEnv.toDataSet(table, EstateInfo.class);
        estateInfoDataSet.print();

    }


}
